package k2platform.common.util;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;


public class ZookeeperService implements IService {

    public static final String ZOOKEEPER_PATH_SEPARATOR = "/";

    public static final Log LOG = LogFactory.getLog(ZookeeperService.class);

    private static Map<String, Set<IWatcher>> WATCHERS = new ConcurrentHashMap<>();

    private Configuration conf;
    private CuratorFramework client;

    public ZookeeperService(String zookeeperUrl) {
        this(zookeeperUrl, Constants.ZOOKEEPER_NAMESPACE);
    }

    public ZookeeperService(String zookeeperUrl, String namespace) {
        this.conf = new Configuration();
        conf.setProperty(ParamNames.ZOOKEEPER_URL, zookeeperUrl);
        conf.setProperty(ParamNames.ZOOKEEPER_NAMESPACE, namespace);
        client = CuratorFrameworkFactory.builder()
                .connectString(conf.getString(ParamNames.ZOOKEEPER_URL))
                .namespace(conf.getString(ParamNames.ZOOKEEPER_NAMESPACE))
                .retryPolicy(new RetryNTimes(Constants.ZOOKEEPER_RETRY_TIME, 1000))
                .connectionTimeoutMs(Integer.MAX_VALUE).build();
    }

    public ZookeeperService(Configuration conf) {
        if (conf.containsKey(ParamNames.ZOOKEEPER_URL) == false)
            LOG.error("Can't find zookeeper.url in configuration. Please check your configuration.");
        else {
            this.conf = conf;
            client = CuratorFrameworkFactory.builder()
                    .connectString(conf.getString(ParamNames.ZOOKEEPER_URL))
                    .namespace(conf.getString(ParamNames.ZOOKEEPER_NAMESPACE))
                    .retryPolicy(new RetryNTimes(Constants.ZOOKEEPER_RETRY_TIME, 1000))
                    .connectionTimeoutMs(Integer.MAX_VALUE).build();
        }
    }

    public static Map<String, Set<IWatcher>> getWatchers() {
        return ZookeeperService.WATCHERS;
    }

    /**
     * Before using zookeeper service, client should start service first.
     **/
    public void start() {
        LOG.debug("Start Zookeeper Service!");
        client.start();
    }

    public boolean isClosed() {
        return client.getState() == CuratorFrameworkState.STOPPED ? true : false;
    }

    public boolean isStarted() {
        return client.getState() == CuratorFrameworkState.STARTED ? true : false;
    }

    public String getNamespace() {
        return conf.getString(ParamNames.ZOOKEEPER_NAMESPACE);
    }

    /**
     * After using zookeeper service, client should close service.
     **/
    public void close() {
        LOG.debug("Close Zookeeper Service!");
        client.close();
    }

    public void register(final String path, final IWatcher watchObj) {
        if (watchObj == null)
            return;
        if (WATCHERS.containsKey(path) == false)
            WATCHERS.put(path, new HashSet<IWatcher>());
        WATCHERS.get(path).add(watchObj);

        if (watchObj instanceof NodeWatcher) {
            final NodeCache node = new NodeCache(client, path);
            try {
                node.start();
            } catch (Exception e) {
                LOG.error("Error to start node cache for " + path);
                e.printStackTrace();
            }

            ((NodeWatcher) watchObj).setNodeCache(node);

            node.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    NodeWatcher tmp = (NodeWatcher) watchObj;
                    tmp.clear();

                    if (tmp.getNodeDataSign()) {
                        tmp.load(node.getCurrentData().getData());
                    } else {
                        tmp.load(ZookeeperService.this.getChildrenData(path));
                    }
                }
            });
        } else if (watchObj instanceof PathWatcher) {
            PathChildrenCache pathCache = new PathChildrenCache(client, path, true);
            try {
                pathCache.start();
            } catch (Exception e) {
                LOG.error("Error to start path cache for " + path);
                e.printStackTrace();
            }

            ((PathWatcher) watchObj).setPathCache(pathCache);

            pathCache.getListenable().addListener(new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {

                    if (event.getData() == null || event.getData().getPath() == null)
                        return;

                    String path = event.getData().getPath();
                    int pos = path.lastIndexOf("/");
                    String childName = path.substring(pos + 1);

                    if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
                        ((PathWatcher) watchObj).add(childName, event.getData().getData());
                    } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                        ((PathWatcher) watchObj).remove(childName);
                    } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                        ((PathWatcher) watchObj).update(childName, event.getData().getData());
                    }
                }
            });
        } else if (watchObj instanceof TreeWatcher) {
            TreeCache treeCache = new TreeCache(client, path);
            try {
                treeCache.start();
            } catch (Exception e) {
                LOG.error("Error to start tree cache for " + path);
                e.printStackTrace();
            }
            ((TreeWatcher) watchObj).setTreeCache(treeCache);

            treeCache.getListenable().addListener(new TreeCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception {

                    if (event.getData() == null || event.getData().getPath() == null)
                        return;

                    String basePath = ((TreeWatcher) watchObj).getBasePath();
                    int pos = basePath.lastIndexOf("/");
                    String path = event.getData().getPath();
                    String childName = path.substring(pos + 1);

                    if (event.getType().equals(TreeCacheEvent.Type.NODE_ADDED)) {
                        ((TreeWatcher) watchObj).add(childName, event.getData().getData());
                    } else if (event.getType().equals(TreeCacheEvent.Type.NODE_REMOVED)) {
                        ((TreeWatcher) watchObj).remove(childName);
                    } else if (event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
                        ((TreeWatcher) watchObj).update(childName, event.getData().getData());
                    }
                }
            });
        }
    }

    public void unregister(String path, IWatcher watchObj) {
        if (watchObj == null)
            return;

        if (WATCHERS.containsKey(path)) {
            if (WATCHERS.get(path).contains(watchObj)) {
                if (WATCHERS.get(path).size() == 1)
                    WATCHERS.remove(path);
                else
                    WATCHERS.get(path).remove(watchObj);
            }
        }

        if (watchObj instanceof NodeWatcher) {
            try {
                ((NodeWatcher) watchObj).closeNodeCache();
            } catch (IOException e) {
                LOG.error("Error to close Node Watcher on " + path);
                e.printStackTrace();
            }
        } else if (watchObj instanceof PathWatcher) {
            try {
                ((PathWatcher) watchObj).closePathCache();
            } catch (IOException e) {
                LOG.error("Error to close Path Watcher on " + path);
                e.printStackTrace();
            }
        } else if (watchObj instanceof TreeWatcher) {
            try {
                ((TreeWatcher) watchObj).closeTreeCache();
            } catch (IOException e) {
                LOG.error("Error to close Path Watcher on " + path);
                e.printStackTrace();
            }
        }
    }

    public byte[] get(String path) throws Exception {
        if (checkExists(path) == false)
            return null;
        return client.getData().forPath(path);
    }

    public List<String> getChildren(String path) throws Exception {
        return client.getChildren().forPath(path);
    }

    public Map<String, byte[]> getChildrenData(String path) throws Exception {
        if (checkExists(path) == false)
            return null;

        Map<String, byte[]> results = new HashMap<String, byte[]>();
        List<String> children = client.getChildren().forPath(path);
        for (String child : children) {
            results.put(child, client.getData().forPath(path + ZOOKEEPER_PATH_SEPARATOR + child));
        }
        return results;
    }

    public boolean checkExists(String path) throws Exception {
        Stat pstat = client.checkExists().forPath(path);
        if (pstat == null)
            return false;
        else
            return true;
    }

    public void update(String path, byte[] data) throws Exception {
        if (this.checkExists(path)) {
            client.setData().forPath(path, data);
        } else {
            this.create(path, data);
        }
    }

    public void delete(String path) throws Exception {
        if (this.checkExists(path)) {
            client.delete().deletingChildrenIfNeeded().forPath(path);
        }
    }

    public void create(String path, byte[] data) throws Exception {

        if (client.checkExists().forPath(path) != null)
            client.delete().deletingChildrenIfNeeded().forPath(path);
        client.create().creatingParentsIfNeeded().forPath(path, data);
    }

    public void create(String path, Map<String, byte[]> datas) throws Exception {
        for (String child : datas.keySet()) {
            if (client.checkExists().forPath(path + ZOOKEEPER_PATH_SEPARATOR + child) != null)
                client.delete().deletingChildrenIfNeeded().forPath(path + ZOOKEEPER_PATH_SEPARATOR + child);
            client.create().creatingParentsIfNeeded().forPath(path + ZOOKEEPER_PATH_SEPARATOR + child, datas.get(child));
        }
    }
}
